feat: Python UDFs: per-session inlining toggle and strict refusal setting#1546
Open
timsaucer wants to merge 6 commits into
Open
feat: Python UDFs: per-session inlining toggle and strict refusal setting#1546timsaucer wants to merge 6 commits into
timsaucer wants to merge 6 commits into
Conversation
This was referenced May 15, 2026
…fusal
Adds a per-session toggle that turns inline Python UDF encoding on or
off, plus the supporting plumbing to make it usable through
pickle.dumps.
Codec layer:
* PythonLogicalCodec / PythonPhysicalCodec gain a python_udf_inlining
bool (default true) and a with_python_udf_inlining(enabled) builder.
Each try_encode_udf{,af,wf} short-circuits to inner when the toggle
is off; each try_decode_udf{,af,wf} that recognizes a DFPY* magic
on a strict codec returns a clean Execution error instead of
invoking cloudpickle.loads. The refusal message names the UDF and
the wire family so an operator can see at a glance whether to
re-encode the bytes or register the UDF on the receiver.
Session layer:
* PySessionContext::with_python_udf_inlining(enabled) returns a new
session whose stacked logical + physical codecs both carry the
toggle. The Arc<SessionState> is cloned (cheap), only the codec
pair is rebuilt, so registrations and config stay attached.
* SessionContext.with_python_udf_inlining(*, enabled) is the Python
wrapper. enabled is keyword-only because positional booleans at
the call site read as opaque.
Sender-side context:
* datafusion.ipc gains set_sender_ctx / get_sender_ctx /
clear_sender_ctx thread-locals. Expr.__reduce__ now consults
get_sender_ctx() to pick the codec for outbound pickles, which is
the only path through which a strict session affects pickle.dumps
(the protocol calls __reduce__ with no arguments). Without a
sender context the default codec is used.
Tests:
* test_pickle_expr.py picks up TestPythonUdfInliningToggle (covers
both directions of the toggle plus the explicit-ctx fast path),
TestWorkerCtxLifecycle (set/clear/threading), and
TestSenderCtxLifecycle.
* New test_pickle_multiprocessing.py + helpers exercise the full
driver -> worker round-trip on a multiprocessing.Pool with set_*_ctx
installed in the worker initializer.
* CI workflow gets a 30-minute timeout-minutes backstop so a hung
pickle worker can't block the matrix indefinitely.
User-guide docs and the runnable examples land in PR4 of this series.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
d680e12 to
14178db
Compare
Rewrite with_python_udf_inlining docstring for readability and remove references to /user-guide/io/distributing_work, which does not exist yet. Keep security warning inline as a .. warning:: Security block, matching the existing pattern in Expr.to_bytes / from_bytes / __reduce__. The central doc will land in a follow-on PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per CLAUDE.md, every Python function needs a docstring example. Adds examples to with_python_udf_inlining, set_sender_ctx, clear_sender_ctx, and get_sender_ctx. Also clarifies that with_python_udf_inlining returns a new SessionContext and leaves the original unchanged, matching the with_logical_extension_codec pattern. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* codec: strict refusal routes through `read_framed_payload` so malformed inline bytes surface their own diagnostic; the "inlining is disabled" message now fires only when the payload would have decoded. * codec: add summary line above `PythonPhysicalCodec::with_python_udf_inlining` cross-link for rustdoc rendering. * expr: hoist `get_sender_ctx` import to module top; note that `__reduce__` also drives `copy.copy` / `copy.deepcopy`. * context: accept `with_python_udf_inlining` positionally or as kwarg (drop `*,`). * tests: replace size-ratio heuristic with semantic check for the `DFPYUDF` family prefix; switch single-batch closure test to `pool.apply`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- `SessionContext.with_python_udf_inlining` now keyword-only (`*, enabled`) to match the documented call style and the existing doctests/tests. - `refuse_if_inline` and the three `try_decode_python_*` decoders short- circuit on a `starts_with(family)` check before `Python::attach`, so plans whose UDFs are not Python-defined no longer pay a GIL acquisition per decode call. Semantics preserved: `strip_wire_header` already returns `Ok(None)` when the prefix does not match. - `datafusion.ipc` module docstring wraps the `set_sender_ctx` example in `try`/`finally` and notes that the thread-local holds a strong reference to the installed `SessionContext` until cleared. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Addresses part of #1517
This is PR 3 of 4. The four PRs stack sequentially on top of this one; subsequent PRs target this branch's tip until it merges.
Follow up PR:
Rationale for this change
PRs 1 and 2 ship Python UDFs inline through the codec. There is a follow-on need: Untrusted-input decoding. A receiver that may read
Expr.from_bytesinput from an untrusted source must refuse to invokecloudpickle.loadson the inline payload. (pickle.loadson untrusted input is still unsafe regardless of this toggle — see the security note in the docstrings.)We resolve this by an on/off switch at the codec level. The codec already sits on every session, so the toggle is naturally per-session.
What changes are included in this PR?
SessionContextlevel to enable/disable Python inlining of UDFs, which gets passed through to the codec layer.Are there any user-facing changes?
Yes, but it's pure addition:
SessionContext.with_python_udf_inliningis a new method.datafusion.ipc.set_sender_ctx/get_sender_ctx/clear_sender_ctxare new functions for propagating a configured session throughpickle.dumps.The user-guide page documenting the full pattern (and the multiprocessing / Ray runnable examples) lands in PR 4 of this series.